b06059f301497132bc1c44e4a049f19863b9fc03,src/main/java/com/conveyal/r5/analyst/broker/BrokerHttpHandler.java,BrokerHttpHandler,service,#Request#Response#,55
Before Change
// Together with the HTTP method, path component 1 establishes which action the client wishes to take.
if (pathComponents.length < 2) {
response.setStatus(HttpStatus.BAD_REQUEST_400);
response.setDetailMessage("path should have at least one part");
}
String command = pathComponents[1];
try {
if (request.getMethod() == Method.HEAD) {
/* Let the client know server is alive and URI + request are valid. */
mapper.readTree(request.getInputStream());
response.setStatus(HttpStatus.OK_200);
return;
}
if (request.getMethod() == Method.GET && "jobs".equals(command)) {
/* Fetch status of all jobs. */
List<JobStatus> ret = new ArrayList<>();
broker.jobs.forEach(job -> ret.add(new JobStatus(job)));
// Add a summary of all jobs to the list.
ret.add(new JobStatus(ret));
response.setStatus(HttpStatus.OK_200);
OutputStream os = response.getOutputStream();
mapper.writeValue(os, ret);
os.close();
return;
}
else if (request.getMethod() == Method.GET && "workers".equals(command)) {
/* Report on all known workers. */
response.setStatus(HttpStatus.OK_200);
OutputStream os = response.getOutputStream();
mapper.writeValue(os, broker.workerCatalog.observationsByWorkerId.values());
os.close();
return;
}
else if (request.getMethod() == Method.POST && "dequeue".equals(command)) {
/*
Workers use this command to fetch tasks from a work queue.
They supply their R5 commit, network ID, and a unique worker ID to make sure they always get the
same category of work. Newer workers provide this information (and much more) in a JSON request body.
Older workers will include a simplified version of it in the URL and headers.
The method is POST because unlike GETs (which fetch status) it modifies the task queue on the server.
*/
WorkerStatus workerStatus = JsonUtilities.objectFromRequestBody(request, WorkerStatus.class);
String workType = pathComponents[2]; // Worker specifies single point or regional polling
if (workerStatus == null) {
// Older worker did not supply a JSON body. Fill in the status from URL and headers.
workerStatus = new WorkerStatus();
workerStatus.networks = Sets.newHashSet(pathComponents[3]);
workerStatus.workerVersion = pathComponents[4];
workerStatus.workerId = request.getHeader(AnalystWorker.WORKER_ID_HEADER);
}
// Assume one loaded graph (or preferred graph at startup) in the current system
// Add this worker to our catalog, tracking its graph affinity and the last time it was seen.
broker.workerCatalog.catalog(workerStatus);
WorkerCategory category = workerStatus.getWorkerCategory();
if ("single".equals(workType)) {
// Worker is polling for single point tasks.
Broker.WrappedResponse wrappedResponse = new Broker.WrappedResponse(workerStatus.workerId, response);
request.getRequest().getConnection().addCloseListener(
(c, i) -> broker.removeSinglePointChannel(category, wrappedResponse));
// The request object will be shelved and survive after the handler function exits.
response.suspend();
broker.registerSinglePointChannel(category, wrappedResponse);
}
else if ("regional".equals(workType)) {
// Worker is polling for tasks from regional batch jobs.
request.getRequest().getConnection().addCloseListener(
(c, i) -> broker.removeSuspendedResponse(category, response));
// The request object will be shelved and survive after the handler function exits.
response.suspend();
broker.registerSuspendedResponse(category, response);
}
else {
response.setStatus(HttpStatus.NOT_FOUND_404);
response.setDetailMessage("Context not found. Should be either 'single' or 'regional'.");
}
}
else if (request.getMethod() == Method.POST && "enqueue".equals(command)) {
/* The front end wants to enqueue work tasks. */
String workType = pathComponents[2];
if ("single".equals(workType)) {
// Enqueue a single priority task.
GenericClusterRequest task =
mapper.readValue(request.getInputStream(), GenericClusterRequest.class);
if (broker.enqueuePriorityTask(task, response)) {
// Enqueueing the priority task has set its internal taskId.
// TODO move all removal listener registration into the broker functions.
request.getRequest().getConnection()
.addCloseListener((closeable, iCloseType) -> broker.deletePriorityTask(task.taskId));
// The request object will be shelved and survive after the handler function exits.
response.suspend();
}
return;
}
else if ("regional".equals(workType)) {
// Enqueue a list of tasks that all belong to one job.
List<GenericClusterRequest> tasks = mapper.readValue(request.getInputStream(),
new TypeReference<List<GenericClusterRequest>>() { });
// Pre-validate tasks checking that they are all on the same job
GenericClusterRequest exemplar = tasks.get(0);
for (GenericClusterRequest task : tasks) {
if (!task.jobId.equals(exemplar.jobId) ||
!task.graphId.equals(exemplar.graphId) ||
!task.workerVersion.equals(exemplar.workerVersion)) {
response.setStatus(HttpStatus.BAD_REQUEST_400);
response.setDetailMessage("All tasks must be for the same graph, job, and worker commit.");
return;
}
}
broker.enqueueTasks(tasks);
response.setStatus(HttpStatus.ACCEPTED_202);
}
else {
response.setStatus(HttpStatus.NOT_FOUND_404);
response.setDetailMessage("Context not found; should be either 'single' or 'regional'");
}
}
else if (request.getMethod() == Method.POST && "complete".equals(command)) {
// Requests of the form: POST /complete/{success|<httpErrorStatusCode>}/<taskId>
// Mark a specific high-priority task as completed, and record its result.
// We were originally planning to do this with a DELETE request that has a body,
// but that is nonstandard enough to anger many libraries including Grizzly.
String successOrError = pathComponents[2];
int taskId = Integer.parseInt(pathComponents[3]);
Response suspendedProducerResponse = broker.deletePriorityTask(taskId);
if (suspendedProducerResponse == null) {
// Signal the source of the response (the worker) that no one is waiting for that response anymore.
response.setStatus(HttpStatus.NOT_FOUND_404);
return;
}
if ("success".equals(successOrError)) {
// The worker did not find any obvious problems with the request and is streaming back what it
// believes to be a reliable work result.
suspendedProducerResponse.setStatus(HttpStatus.OK_200);
} else {
// The worker is providing an error message because it spotted something wrong with the request.
suspendedProducerResponse.setStatus(Integer.parseInt(successOrError));
suspendedProducerResponse.setCharacterEncoding("utf-8");
suspendedProducerResponse.setContentType("application/json");
}
// Copy the result back to the connection that was the source of the task.
try {
long length = ByteStreams.copy(request.getInputStream(), suspendedProducerResponse.getOutputStream());
LOG.info("Returning {} bytes to high-priority consumer for request {}", length, taskId);
} catch (IOException | IllegalStateException ioex) {
// IOExceptions happens when the task producer did not wait to retrieve its result.
// Priority task result delivery is not guaranteed, so we don't need to retry.
// This is not an error from the worker's point of view.
// IllegalStateException can happen if we're in offline mode and we've already returned a 202.
}
// Prepare to pipe data back to the original source of the task (the UI) from the worker.
// TODO clarify how it is that we were previously setting the status code of the suspended response after calling ByteStreams.copy. Is the body being buffered?
suspendedProducerResponse.resume();
// Tell source of the POSTed data (the worker) that the broker has handled it with no problems.
response.setStatus(HttpStatus.OK_200);
return;
}
else if (request.getMethod() == Method.DELETE) {
// Used by workers to acknowledge completion of a batch task and remove it from queues,
// avoiding re-delivery. Practically speaking this means the worker has put the result in a queue
String context = pathComponents[1];
String id = pathComponents[2];
if ("tasks".equalsIgnoreCase(context)) {
int taskId = Integer.parseInt(id);
// This must not have been a priority task. Try to delete it as a normal job task.
if (broker.markTaskCompleted(taskId)) {
response.setStatus(HttpStatus.OK_200);
} else {
response.setStatus(HttpStatus.NOT_FOUND_404);
}
} else if ("jobs".equals((context))) {
if (broker.deleteJob(id)) {
response.setStatus(HttpStatus.OK_200);
response.setDetailMessage("job deleted");
} else {
response.setStatus(HttpStatus.NOT_FOUND_404);
response.setDetailMessage("job not found");
}
} else {
response.setStatus(HttpStatus.BAD_REQUEST_400);
response.setDetailMessage("Delete is only allowed for tasks and jobs.");
}
} else {
After Change
// Together with the HTTP method, path component 1 establishes which action the client wishes to take.
if (pathComponents.length < 2) {
response.setStatus(HttpStatus.BAD_REQUEST_400);
response.setDetailMessage("path should have at least one part");
TaskError error = new TaskError(null, "Path should have at least one part", "");
OutputStream os = response.getOutputStream();
mapper.writeValue(os, new TaskError[] { error });
return;
}
String command = pathComponents[1];
try {
if (request.getMethod() == Method.HEAD) {
/* Let the client know server is alive and URI + request are valid. */
mapper.readTree(request.getInputStream());
response.setStatus(HttpStatus.OK_200);
return;
}
if (request.getMethod() == Method.GET && "jobs".equals(command)) {
/* Fetch status of all jobs. */
List<JobStatus> ret = new ArrayList<>();
broker.jobs.forEach(job -> ret.add(new JobStatus(job)));
// Add a summary of all jobs to the list.
ret.add(new JobStatus(ret));
response.setStatus(HttpStatus.OK_200);
OutputStream os = response.getOutputStream();
mapper.writeValue(os, ret);
os.close();
return;
}
else if (request.getMethod() == Method.GET && "workers".equals(command)) {
/* Report on all known workers. */
response.setStatus(HttpStatus.OK_200);
OutputStream os = response.getOutputStream();
mapper.writeValue(os, broker.workerCatalog.observationsByWorkerId.values());
os.close();
return;
}
else if (request.getMethod() == Method.POST && "dequeue".equals(command)) {
/*
Workers use this command to fetch tasks from a work queue.
They supply their R5 commit, network ID, and a unique worker ID to make sure they always get the
same category of work. Newer workers provide this information (and much more) in a JSON request body.
Older workers will include a simplified version of it in the URL and headers.
The method is POST because unlike GETs (which fetch status) it modifies the task queue on the server.
*/
WorkerStatus workerStatus = JsonUtilities.objectFromRequestBody(request, WorkerStatus.class);
String workType = pathComponents[2]; // Worker specifies single point or regional polling
if (workerStatus == null) {
// Older worker did not supply a JSON body. Fill in the status from URL and headers.
workerStatus = new WorkerStatus();
workerStatus.networks = Sets.newHashSet(pathComponents[3]);
workerStatus.workerVersion = pathComponents[4];
workerStatus.workerId = request.getHeader(AnalystWorker.WORKER_ID_HEADER);
}
// Assume one loaded graph (or preferred graph at startup) in the current system
// Add this worker to our catalog, tracking its graph affinity and the last time it was seen.
broker.workerCatalog.catalog(workerStatus);
WorkerCategory category = workerStatus.getWorkerCategory();
if ("single".equals(workType)) {
// Worker is polling for single point tasks.
Broker.WrappedResponse wrappedResponse = new Broker.WrappedResponse(workerStatus.workerId, response);
request.getRequest().getConnection().addCloseListener(
(c, i) -> broker.removeSinglePointChannel(category, wrappedResponse));
// The request object will be shelved and survive after the handler function exits.
response.suspend();
broker.registerSinglePointChannel(category, wrappedResponse);
}
else if ("regional".equals(workType)) {
// Worker is polling for tasks from regional batch jobs.
request.getRequest().getConnection().addCloseListener(
(c, i) -> broker.removeSuspendedResponse(category, response));
// The request object will be shelved and survive after the handler function exits.
response.suspend();
broker.registerSuspendedResponse(category, response);
}
else {
response.setStatus(HttpStatus.NOT_FOUND_404);
response.setDetailMessage("Context not found. Should be either 'single' or 'regional'.");
}
}
else if (request.getMethod() == Method.POST && "enqueue".equals(command)) {
/* The front end wants to enqueue work tasks. */
String workType = pathComponents[2];
if ("single".equals(workType)) {
// Enqueue a single priority task.
GenericClusterRequest task =
mapper.readValue(request.getInputStream(), GenericClusterRequest.class);
if (broker.enqueuePriorityTask(task, response)) {
// Enqueueing the priority task has set its internal taskId.
// TODO move all removal listener registration into the broker functions.
request.getRequest().getConnection()
.addCloseListener((closeable, iCloseType) -> broker.deletePriorityTask(task.taskId));
// The request object will be shelved and survive after the handler function exits.
response.suspend();
}
return;
}
else if ("regional".equals(workType)) {
// Enqueue a list of tasks that all belong to one job.
List<GenericClusterRequest> tasks = mapper.readValue(request.getInputStream(),
new TypeReference<List<GenericClusterRequest>>() { });
// Pre-validate tasks checking that they are all on the same job
GenericClusterRequest exemplar = tasks.get(0);
for (GenericClusterRequest task : tasks) {
if (!task.jobId.equals(exemplar.jobId) ||
!task.graphId.equals(exemplar.graphId) ||
!task.workerVersion.equals(exemplar.workerVersion)) {
response.setStatus(HttpStatus.BAD_REQUEST_400);
response.setDetailMessage("All tasks must be for the same graph, job, and worker commit.");
TaskError error = new TaskError(null, "All tasks must be for the same graph, job, and worker commit.", "");
OutputStream os = response.getOutputStream();
mapper.writeValue(os, new TaskError[] { error });
os.close();
return;
}
}
broker.enqueueTasks(tasks);
response.setStatus(HttpStatus.ACCEPTED_202);
}
else {
response.setStatus(HttpStatus.NOT_FOUND_404);
response.setDetailMessage("Context not found; should be either 'single' or 'regional'");
}
}
else if (request.getMethod() == Method.POST && "complete".equals(command)) {
// Requests of the form: POST /complete/{success|<httpErrorStatusCode>}/<taskId>
// Mark a specific high-priority task as completed, and record its result.
// We were originally planning to do this with a DELETE request that has a body,
// but that is nonstandard enough to anger many libraries including Grizzly.
String successOrError = pathComponents[2];
int taskId = Integer.parseInt(pathComponents[3]);
Response suspendedProducerResponse = broker.deletePriorityTask(taskId);
if (suspendedProducerResponse == null) {
// Signal the source of the response (the worker) that no one is waiting for that response anymore.
response.setStatus(HttpStatus.NOT_FOUND_404);
return;
}
if ("success".equals(successOrError)) {
// The worker did not find any obvious problems with the request and is streaming back what it
// believes to be a reliable work result.
suspendedProducerResponse.setStatus(HttpStatus.OK_200);
} else {
// The worker is providing an error message because it spotted something wrong with the request.
suspendedProducerResponse.setStatus(Integer.parseInt(successOrError));
suspendedProducerResponse.setCharacterEncoding("utf-8");
suspendedProducerResponse.setContentType("application/json");
}
// Copy the result back to the connection that was the source of the task.
try {
long length = ByteStreams.copy(request.getInputStream(), suspendedProducerResponse.getOutputStream());
LOG.info("Returning {} bytes to high-priority consumer for request {}", length, taskId);
} catch (IOException | IllegalStateException ioex) {
// IOExceptions happens when the task producer did not wait to retrieve its result.
// Priority task result delivery is not guaranteed, so we don't need to retry.
// This is not an error from the worker's point of view.
// IllegalStateException can happen if we're in offline mode and we've already returned a 202.
}
// Prepare to pipe data back to the original source of the task (the UI) from the worker.
// TODO clarify how it is that we were previously setting the status code of the suspended response after calling ByteStreams.copy. Is the body being buffered?
suspendedProducerResponse.resume();
// Tell source of the POSTed data (the worker) that the broker has handled it with no problems.
response.setStatus(HttpStatus.OK_200);
return;
}
else if (request.getMethod() == Method.DELETE) {
// Used by workers to acknowledge completion of a batch task and remove it from queues,
// avoiding re-delivery. Practically speaking this means the worker has put the result in a queue
String context = pathComponents[1];
String id = pathComponents[2];
if ("tasks".equalsIgnoreCase(context)) {
int taskId = Integer.parseInt(id);
// This must not have been a priority task. Try to delete it as a normal job task.
if (broker.markTaskCompleted(taskId)) {
response.setStatus(HttpStatus.OK_200);
} else {
response.setStatus(HttpStatus.NOT_FOUND_404);
}
} else if ("jobs".equals((context))) {
if (broker.deleteJob(id)) {
response.setStatus(HttpStatus.OK_200);
response.setDetailMessage("job deleted");
} else {
response.setStatus(HttpStatus.NOT_FOUND_404);
response.setDetailMessage("job not found");
}
} else {
response.setStatus(HttpStatus.BAD_REQUEST_400);
response.setDetailMessage("Delete is only allowed for tasks and jobs.");
TaskError error = new TaskError(null, "Delete is only allowed for tasks and jobs", "");
OutputStream os = response.getOutputStream();